feat(go): use load data statement for bulk ingestion#79
feat(go): use load data statement for bulk ingestion#79shuprime wants to merge 1 commit intoadbc-drivers:mainfrom
Conversation
Mandukhai-Alimaa
left a comment
There was a problem hiding this comment.
Great work on this! Just a few things to address.
| func (c *mysqlConnectionImpl) executeLoadDataIngest(ctx context.Context, conn *sqlwrapper.LoggingConn, options *driverbase.BulkIngestOptions, stream array.RecordReader) (int64, error) { | ||
| r, w := io.Pipe() | ||
| readerId := loadReaderCounter.Add(1) | ||
| readerName := fmt.Sprintf("adbc_ingest_%s_%d", options.TableName, readerId) |
There was a problem hiding this comment.
This could be a reader name injection/quoting risk. options.TableName is user-controlled and if the table name contains quotes or special characters, the query breaks. We can probably just use the counter without the user input (table name).
| res, err := conn.ExecContext(ctx, query) | ||
| if err != nil { | ||
| return -1, c.ErrorHelper.WrapIO(err, "failed to execute LOAD DATA statement") | ||
| } |
There was a problem hiding this comment.
If ExecContext fail, the functions returns but the CSV writer goroutine keeps running and will block forever when the pipe buffer fills which could result in resource leaks. We can add r.Close() before returning to unblock the writer, and wrap the goroutine with a cancellable context.
| colsList := strings.Join(colNames, ", ") | ||
|
|
||
| query := fmt.Sprintf( | ||
| "LOAD DATA LOCAL INFILE 'Reader::%s' INTO TABLE %s CHARACTER SET utf8mb4 FIELDS TERMINATED BY '\\t' ESCAPED BY '\\\\' LINES TERMINATED BY '\\n' (%s)", |
There was a problem hiding this comment.
CHARACTER SET utf8mb4 makes MySQL treat the incoming stream as UTF‑8. If we ever ingest non‑UTF8 bytes (or binary-ish data), could this cause conversion issues? Do we need this here, or should we drop/make it optional?
| rows, rowCount := it.CurrentBatch() | ||
|
|
||
| buf.Reset() | ||
| for rowIdx := 0; rowIdx < rowCount; rowIdx++ { |
There was a problem hiding this comment.
| for rowIdx := 0; rowIdx < rowCount; rowIdx++ { | |
| for rowIdx := range rowCount { |
|
|
||
| buf.Reset() | ||
| for rowIdx := 0; rowIdx < rowCount; rowIdx++ { | ||
| for colIdx := 0; colIdx < numCols; colIdx++ { |
There was a problem hiding this comment.
| for colIdx := 0; colIdx < numCols; colIdx++ { | |
| for colIdx := range numCols { |
| schema := arrow.NewSchema([]arrow.Field{ | ||
| {Name: "bool", Type: arrow.FixedWidthTypes.Boolean}, | ||
| {Name: "float", Type: arrow.PrimitiveTypes.Float64}, | ||
| {Name: "bin", Type: arrow.BinaryTypes.Binary}, | ||
| }, nil) |
There was a problem hiding this comment.
Can we add more types to this test to improve coverage? timestamp, date32, int8/16/64, uint32, decimal128 etc?
What's Changed
This PR optimizes bulk data ingestion in the MySQL driver by implementing support for LOAD DATA LOCAL INFILE.
Technical Details
@@local_infileat the start of ingestion to ensure server support.io.Pipeandgomysql.RegisterReaderHandlerto stream data directly. This avoids writing temporary files to disk.\N).ExecuteBatchedBulkIngestlogic.Testing
local_infileis disabled.Benchmark
Closes #78